﻿using iTool.Common.Options;
using Orleans;
using Orleans.Concurrency;
using Orleans.Providers;
using Orleans.Runtime;
using Orleans.Storage;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Data.SqlClient;
using System.Linq;
using System.Threading.Tasks;

namespace iTool.ClusterComponent
{
    public interface ISetStorageService : iToolServiceWithStringKey
    {
        Task<SetState> GetStateAsync();
        Task<int> GetVersionAsync();

        Task<bool> ExistsAsync(string bytes);
        Task<ICollection<string>> GetAsync();
        Task SetAsync(string bytes);
        Task RemoveAsync(string bytes);
        Task RemoveAsync();

        /// <summary>
        /// 求并集
        /// </summary>
        /// <param name="keys"></param>
        /// <returns></returns>
        Task<ICollection<string>> GetUnionAsync(string[] keys);

        /// <summary>
        /// 求交集
        /// </summary>
        /// <param name="keys"></param>
        /// <returns></returns>
        Task<ICollection<string>> GetIntersectAsync(string[] keys);

        /// <summary>
        /// 差集
        /// </summary>
        /// <param name="keys"></param>
        /// <returns></returns>
        Task<ICollection<string>> GetDifferencesAsync(string[] keys);
    }

    [StorageProvider(ProviderName = "SetStorageService")]
    public class SetStorageService : Grain<SetState>, ISetStorageService, IGrainStorage
    {
        readonly AdoNetOptions options;
        string connection;
        public SetStorageService(AdoNetOptions options)
        {
            this.options = options;
            this.connection = this.options.GetConnection();
        }


        public async Task RemoveAsync(string bytes)
        {
            if (this.State.keyValuePairs?.ContainsKey(bytes) == true)
            {
                this.State.Version++;
                this.State.keyValuePairs.TryRemove(bytes, out int id);
                if (id > 0)
                {
                    await this.ClearStateAsync(id);
                }
                else
                {
                    await this.ClearStateAsync(bytes);
                }
            }
        }

        public async Task RemoveAsync()
        {
            await this.ClearStateAsync();
        }

        public async Task SetAsync(string bytes)
        {
            if (!this.State.keyValuePairs?.ContainsKey(bytes) == true)
            {
            this.State.Version++;
                var id = await this.WriteStateAsync(bytes);
                this.State.keyValuePairs.TryAdd(bytes, id);
            }
        }

        public Task<bool> ExistsAsync(string bytes)
        {
            return Task.FromResult(this.State.keyValuePairs?.ContainsKey(bytes) == true);
        }

        public Task<ICollection<string>> GetAsync()
        {
            return Task.FromResult(this.State.keyValuePairs?.Keys);
        }

        public async Task<ICollection<string>> GetDifferencesAsync(string[] keys)
        {
            var result = new List<string>();
            if (this.State.keyValuePairs?.Keys.Count > 0)
            {
                if (keys.Length > 0)
                {
                    result = this.State.keyValuePairs.Keys.ToList();

                    var primaryKey = this.GetPrimaryKeyString();
                    foreach (var key in keys)
                    {
                        if (key != primaryKey)
                        {
                            var storageService = this.GrainFactory.GetGrain<ISetStorageService>(key);
                            var list = await storageService.GetAsync();
                            if (list.Count > 0)
                            {
                                result = result.Where(item => !list.Contains(item)).ToList();
                                if (result.Count == 0)
                                {
                                    break;
                                }
                            }
                        }
                    }
                }
            }
            return result;
        }

        public async Task<ICollection<string>> GetIntersectAsync(string[] keys)
        {
            var result = new List<string>();
            if (this.State.keyValuePairs?.Keys.Count > 0)
            {
                if (keys.Length > 0)
                {
                    var primaryKey = this.GetPrimaryKeyString();
                    foreach (var key in keys)
                    {
                        if (key != primaryKey)
                        {
                            var storageService = this.GrainFactory.GetGrain<ISetStorageService>(key);
                            var list = await storageService.GetAsync();
                            if (list.Count > 0)
                            {
                                result = result.Concat(this.State.keyValuePairs.Keys.Where(item => list.Contains(item))).ToList();

                                if (result.Count == this.State.keyValuePairs.Keys.Count)
                                {
                                    break;
                                }
                            }
                        }
                    }
                }
            }

            return result;

        }

        public async Task<ICollection<string>> GetUnionAsync(string[] keys)
        {
            var result = this.State.keyValuePairs?.Keys.ToList();

            result = result ?? new List<string>();

            if (keys.Length > 0)
            {
                var primaryKey = this.GetPrimaryKeyString();
                foreach (var key in keys)
                {
                    if (key != primaryKey)
                    {
                        var storageService = this.GrainFactory.GetGrain<ISetStorageService>(key);
                        var list = await storageService.GetAsync();
                        if (list.Count > 0)
                        {
                            result = result.Concat(list).ToList();
                        }
                    }
                }
            }

            return result.Distinct().ToList();
        }



        public async Task ClearStateAsync(string grainType, GrainReference grainReference, IGrainState grainState)
        {
            await using (SqlConnection conn = new SqlConnection(this.connection))
            {
                string read = "SELECT top 1 id from ClusterCacheSetTable where [key] = @key";
                string deleteValue = "DELETE ClusterCacheSetValueTable where listID = @id";
                string delete = "DELETE ClusterCacheSetTable where [key] = @key";

                await conn.OpenAsync();
                int id = 0;
                await using (SqlCommand command1 = new SqlCommand(read, conn))
                {
                    command1.Parameters.AddRange(new SqlParameter[]
                    {
                            new SqlParameter("key",grainReference.GetPrimaryKeyString())
                    });
                    int.TryParse(((await command1.ExecuteScalarAsync()) ?? "").ToString(), out id);
                }

                if (id > 0)
                {
                    try
                    {
                        await using (SqlCommand command = new SqlCommand(deleteValue, conn))
                        {
                            command.Parameters.AddRange(new SqlParameter[]
                            {
                                new SqlParameter("id",id)
                            });
                            await command.ExecuteNonQueryAsync();
                        }
                        await using (SqlCommand command = new SqlCommand(delete, conn))
                        {
                            command.Parameters.AddRange(new SqlParameter[]
                            {
                                new SqlParameter("key",grainReference.GetPrimaryKeyString())
                            });
                            await command.ExecuteNonQueryAsync();
                        }
                    }
                    catch (System.Exception)
                    {
                        throw;
                    }
                }
            }
        }

        public async Task ReadStateAsync(string grainType, GrainReference grainReference, IGrainState grainState)
        {

            string key = grainReference.GetPrimaryKeyString();

            await using (SqlConnection conn = new SqlConnection(this.connection))
            {
                string read = "SELECT top 1 id from ClusterCacheSetTable where [key] = @key";
                string readValue = "SELECT value,id FROM ClusterCacheSetValueTable where listID = @id";

                await conn.OpenAsync();
                int id = 0;
                await using (SqlCommand command = new SqlCommand(read, conn))
                {
                    command.Parameters.AddRange(new SqlParameter[]
                    {
                        new SqlParameter("key",key)
                    });
                    int.TryParse(((await command.ExecuteScalarAsync()) ?? "").ToString(), out id);
                }

                if (id > 0)
                {
                    await using (SqlCommand command = new SqlCommand(readValue, conn))
                    {
                        var keyValues = new ConcurrentDictionary<string, int>();

                        command.Parameters.AddRange(new SqlParameter[]
                        {
                            new SqlParameter("id",id)
                        });

                        var reader = await command.ExecuteReaderAsync();
                        while (await reader.ReadAsync())
                        {
                            keyValues.TryAdd(reader["value"] as string, (int)reader["id"]);
                        }

                        grainState.State = new SetState
                        {
                            Id = id,
                            keyValuePairs = keyValues
                        };
                    }
                }
                else
                {
                    await using (SqlCommand command = new SqlCommand("insert into ClusterCacheSetTable([key]) values(@key); select @identityV=@@IDENTITY", conn))
                    {
                        var output = new SqlParameter("@identityV", System.Data.SqlDbType.Int);
                        output.Direction = System.Data.ParameterDirection.Output;
                        command.Parameters.AddRange(new SqlParameter[]
                        {
                            new SqlParameter("key",key),
                            output
                        });
                        await command.ExecuteNonQueryAsync();

                        grainState.State = new SetState
                        {
                            Id = (int)output.Value,
                            keyValuePairs = new ConcurrentDictionary<string, int>()
                        };
                    }

                }
            }

        }

        public async Task WriteStateAsync(string grainType, GrainReference grainReference, IGrainState grainState)
        {
            await Task.CompletedTask;
        }

        public async Task ClearStateAsync(int id)
        {
            await using (SqlConnection conn = new SqlConnection(this.connection))
            {
                await using (SqlCommand command = new SqlCommand($"delete ClusterCacheSetValueTable where id = @id", conn))
                {
                    command.Parameters.AddRange(new SqlParameter[]
                    {
                            new SqlParameter("id",id)
                    });
                    await conn.OpenAsync();
                    await command.ExecuteNonQueryAsync();
                }
            }
        }
        public async Task ClearStateAsync(string paload)
        {
            await using (SqlConnection conn = new SqlConnection(this.connection))
            {
                await using (SqlCommand command = new SqlCommand($"delete ClusterCacheSetValueTable where listID=@id and value = @paload", conn))
                {
                    command.Parameters.AddRange(new SqlParameter[]
                    {
                            new SqlParameter("paload",paload),
                            new SqlParameter("id",this.State.Id),
                    });
                    await conn.OpenAsync();
                    await command.ExecuteNonQueryAsync();
                }
            }
        }
        public async Task<int> WriteStateAsync(string paload) 
        {
            await using (SqlConnection conn = new SqlConnection(this.connection))
            {
                await using (SqlCommand command = new SqlCommand($"insert into ClusterCacheSetValueTable(listID,value) values(@listID,@value);select @identityV=@@IDENTITY", conn))
                {
                    var output = new SqlParameter("@identityV", System.Data.SqlDbType.Int);
                    output.Direction = System.Data.ParameterDirection.Output;

                    command.Parameters.AddRange(new SqlParameter[]
                    {
                            new SqlParameter("value",paload),
                            new SqlParameter("listID",this.State.Id),
                            output
                    });
                    await conn.OpenAsync();
                    await command.ExecuteNonQueryAsync();
                    return (int)output.Value;
                }
            }
        }

        public Task<SetState> GetStateAsync()
        {
            return Task.FromResult(this.State);
        }

        public Task<int> GetVersionAsync()
        {
            return Task.FromResult(this.State.Version);
        }
    }

    public class SetState 
    {
        public int Id { get; set; }
        public int Version { get; set; }
        public ConcurrentDictionary<string, int> keyValuePairs { get; set; }
    }
}
